1 /** 2 * Copyright 2014 Netflix, Inc. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package rx.internal.util; 17 18 import java.util.concurrent.atomic.AtomicInteger; 19 import java.util.concurrent.atomic.AtomicIntegerArray; 20 import java.util.concurrent.atomic.AtomicReference; 21 import java.util.concurrent.atomic.AtomicReferenceArray; 22 23 import rx.Subscription; 24 import rx.functions.Func1; 25 26 /** 27 * Add/Remove without object allocation (after initial construction). 28 * <p> 29 * This is meant for hundreds or single-digit thousands of elements that need 30 * to be rapidly added and randomly or sequentially removed while avoiding object allocation. 31 * <p> 32 * On Intel Core i7, 2.3Mhz, Mac Java 8: 33 * <p> 34 * - adds per second single-threaded => ~32,598,500 for 100 35 * - adds per second single-threaded => ~23,200,000 for 10,000 36 * - adds + removes per second single-threaded => 15,562,100 for 100 37 * - adds + removes per second single-threaded => 8,760,000 for 10,000 38 * 39 * <pre> {@code 40 * Benchmark (size) Mode Samples Score Score error Units 41 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 100 thrpt 5 263571.721 9856.994 ops/s 42 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 10000 thrpt 5 1763.417 211.998 ops/s 43 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 100 thrpt 5 139850.115 17143.705 ops/s 44 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 10000 thrpt 5 809.982 72.931 ops/s 45 * } </pre> 46 * 47 * @param <E> 48 */ 49 public final class IndexedRingBuffer<E> implements Subscription { 50 51 private static final ObjectPool<IndexedRingBuffer<?>> POOL = new ObjectPool<IndexedRingBuffer<?>>() { 52 53 @Override 54 protected IndexedRingBuffer<?> createObject() { 55 return new IndexedRingBuffer<Object>(); 56 } 57 58 }; 59 60 @SuppressWarnings("unchecked") 61 public final static <T> IndexedRingBuffer<T> getInstance() { 62 return (IndexedRingBuffer<T>) POOL.borrowObject(); 63 } 64 65 private final ElementSection<E> elements = new ElementSection<E>(); 66 private final IndexSection removed = new IndexSection(); 67 /* package for unit testing */final AtomicInteger index = new AtomicInteger(); 68 /* package for unit testing */final AtomicInteger removedIndex = new AtomicInteger(); 69 70 // default size of ring buffer 71 /** 72 * Set at 256 ... Android defaults far smaller which likely will never hit the use cases that require the higher buffers. 73 * <p> 74 * The 10000 size test represents something that should be a rare use case (merging 10000 concurrent Observables for example) 75 * 76 * <pre> {@code 77 * ./gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*IndexedRingBufferPerf.*' 78 * 79 * 1024 80 * 81 * Benchmark (size) Mode Samples Score Score error Units 82 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 100 thrpt 5 269292.006 6013.347 ops/s 83 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 10000 thrpt 5 2217.103 163.396 ops/s 84 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 100 thrpt 5 139349.608 9397.232 ops/s 85 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 10000 thrpt 5 1045.323 30.991 ops/s 86 * 87 * 512 88 * 89 * Benchmark (size) Mode Samples Score Score error Units 90 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 100 thrpt 5 270919.870 5381.793 ops/s 91 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 10000 thrpt 5 1724.436 42.287 ops/s 92 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 100 thrpt 5 141478.813 3696.030 ops/s 93 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 10000 thrpt 5 719.447 75.629 ops/s 94 * 95 * 96 * 256 97 * 98 * Benchmark (size) Mode Samples Score Score error Units 99 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 100 thrpt 5 272042.605 7954.982 ops/s 100 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 10000 thrpt 5 1101.329 23.566 ops/s 101 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 100 thrpt 5 140479.804 6389.060 ops/s 102 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 10000 thrpt 5 397.306 24.222 ops/s 103 * 104 * 128 105 * 106 * Benchmark (size) Mode Samples Score Score error Units 107 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 100 thrpt 5 263065.312 11168.941 ops/s 108 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 10000 thrpt 5 581.708 17.397 ops/s 109 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 100 thrpt 5 138051.488 4618.935 ops/s 110 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 10000 thrpt 5 176.873 35.669 ops/s 111 * 112 * 32 113 * 114 * Benchmark (size) Mode Samples Score Score error Units 115 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 100 thrpt 5 250737.473 17260.148 ops/s 116 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 10000 thrpt 5 144.725 26.284 ops/s 117 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 100 thrpt 5 118832.832 9082.658 ops/s 118 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 10000 thrpt 5 32.133 8.048 ops/s 119 * 120 * 8 121 * 122 * Benchmark (size) Mode Samples Score Score error Units 123 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 100 thrpt 5 209192.847 25558.124 ops/s 124 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 10000 thrpt 5 26.520 3.100 ops/s 125 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 100 thrpt 5 100200.463 1854.259 ops/s 126 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 10000 thrpt 5 8.456 2.114 ops/s 127 * 128 * 2 129 * 130 * Benchmark (size) Mode Samples Score Score error Units 131 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 100 thrpt 5 96549.208 4427.239 ops/s 132 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd 10000 thrpt 5 6.637 2.025 ops/s 133 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 100 thrpt 5 34553.169 4904.197 ops/s 134 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove 10000 thrpt 5 2.159 0.700 ops/s 135 * } </pre> 136 * 137 * Impact of IndexedRingBuffer size on merge 138 * 139 * <pre> {@code 140 * ./gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*OperatorMergePerf.*' 141 * 142 * 512 143 * 144 * Benchmark (size) Mode Samples Score Score error Units 145 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5282500.038 530541.761 ops/s 146 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 49327.272 6382.189 ops/s 147 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 53.025 4.724 ops/s 148 * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1 thrpt 5 97395.148 2489.303 ops/s 149 * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1000 thrpt 5 4.723 1.479 ops/s 150 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1 thrpt 5 4534067.250 116321.725 ops/s 151 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 100 thrpt 5 458561.098 27652.081 ops/s 152 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1000 thrpt 5 43267.381 2648.107 ops/s 153 * r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1 thrpt 5 5581051.672 144191.849 ops/s 154 * r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 50.643 4.354 ops/s 155 * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1 thrpt 5 76437.644 959.748 ops/s 156 * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1000 thrpt 5 2965.306 272.928 ops/s 157 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1 thrpt 5 5026522.098 364196.255 ops/s 158 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000 thrpt 5 34926.819 938.612 ops/s 159 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000000 thrpt 5 33.342 1.701 ops/s 160 * 161 * 162 * 128 163 * 164 * Benchmark (size) Mode Samples Score Score error Units 165 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5144891.776 271990.561 ops/s 166 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 53580.161 2370.204 ops/s 167 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 53.265 2.236 ops/s 168 * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1 thrpt 5 96634.426 1417.430 ops/s 169 * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1000 thrpt 5 4.648 0.255 ops/s 170 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1 thrpt 5 4601280.220 53157.938 ops/s 171 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 100 thrpt 5 463394.568 58612.882 ops/s 172 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1000 thrpt 5 50503.565 2394.168 ops/s 173 * r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1 thrpt 5 5490315.842 228654.817 ops/s 174 * r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 50.661 3.385 ops/s 175 * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1 thrpt 5 74716.169 7413.642 ops/s 176 * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1000 thrpt 5 3009.476 277.075 ops/s 177 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1 thrpt 5 4953313.642 307512.126 ops/s 178 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000 thrpt 5 35335.579 2368.377 ops/s 179 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000000 thrpt 5 37.450 0.655 ops/s 180 * 181 * 32 182 * 183 * Benchmark (size) Mode Samples Score Score error Units 184 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 4975957.497 365423.694 ops/s 185 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 52141.226 5056.658 ops/s 186 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 53.663 2.671 ops/s 187 * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1 thrpt 5 96507.893 1833.371 ops/s 188 * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1000 thrpt 5 4.850 0.782 ops/s 189 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1 thrpt 5 4557128.302 118516.934 ops/s 190 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 100 thrpt 5 339005.037 10594.737 ops/s 191 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1000 thrpt 5 50781.535 6071.787 ops/s 192 * r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1 thrpt 5 5604920.068 209285.840 ops/s 193 * r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 50.413 7.496 ops/s 194 * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1 thrpt 5 76098.942 558.187 ops/s 195 * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1000 thrpt 5 2988.137 193.255 ops/s 196 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1 thrpt 5 5177255.256 150253.086 ops/s 197 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000 thrpt 5 34772.490 909.967 ops/s 198 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000000 thrpt 5 34.847 0.606 ops/s 199 * 200 * 8 201 * 202 * Benchmark (size) Mode Samples Score Score error Units 203 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5027331.903 337986.410 ops/s 204 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 51746.540 3585.450 ops/s 205 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 52.682 4.026 ops/s 206 * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1 thrpt 5 96805.587 2868.112 ops/s 207 * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1000 thrpt 5 4.598 0.290 ops/s 208 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1 thrpt 5 4390912.630 300687.310 ops/s 209 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 100 thrpt 5 458615.731 56125.958 ops/s 210 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1000 thrpt 5 49033.105 6132.936 ops/s 211 * r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1 thrpt 5 5090614.100 649439.778 ops/s 212 * r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 48.548 3.586 ops/s 213 * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1 thrpt 5 72285.482 16820.952 ops/s 214 * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1000 thrpt 5 2981.576 316.140 ops/s 215 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1 thrpt 5 4993609.293 267975.397 ops/s 216 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000 thrpt 5 33228.972 1554.924 ops/s 217 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000000 thrpt 5 32.994 3.615 ops/s 218 * 219 * 220 * 2 221 * 222 * Benchmark (size) Mode Samples Score Score error Units 223 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5103812.234 939461.192 ops/s 224 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 51491.116 3790.056 ops/s 225 * r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 54.043 2.340 ops/s 226 * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1 thrpt 5 96575.834 13416.541 ops/s 227 * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1000 thrpt 5 4.740 0.047 ops/s 228 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1 thrpt 5 4435909.832 899133.671 ops/s 229 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 100 thrpt 5 392382.445 59814.783 ops/s 230 * r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1000 thrpt 5 50429.258 7489.849 ops/s 231 * r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1 thrpt 5 5637321.803 161838.195 ops/s 232 * r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 51.065 2.138 ops/s 233 * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1 thrpt 5 76366.764 2631.710 ops/s 234 * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1000 thrpt 5 2978.302 296.418 ops/s 235 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1 thrpt 5 5280829.290 1602542.493 ops/s 236 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000 thrpt 5 35070.518 3565.672 ops/s 237 * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000000 thrpt 5 34.501 0.991 ops/s 238 * 239 * } </pre> 240 */ 241 static int _size = 256; 242 static { 243 // lower default for Android (https://github.com/ReactiveX/RxJava/issues/1820) 244 if (PlatformDependent.isAndroid()) { 245 _size = 8; 246 } 247 248 // possible system property for overriding 249 String sizeFromProperty = System.getProperty("rx.indexed-ring-buffer.size"); // also see RxRingBuffer 250 if (sizeFromProperty != null) { 251 try { 252 _size = Integer.parseInt(sizeFromProperty); 253 } catch (Exception e) { 254 System.err.println("Failed to set 'rx.indexed-ring-buffer.size' with value " + sizeFromProperty + " => " + e.getMessage()); 255 } 256 } 257 } 258 259 /* package for unit testing */static final int SIZE = _size; 260 261 /** 262 * This resets the arrays, nulls out references and returns it to the pool. 263 * This extra CPU cost is far smaller than the object allocation cost of not pooling. 264 */ 265 public void releaseToPool() { 266 // need to clear all elements so we don't leak memory 267 int maxIndex = index.get(); 268 int realIndex = 0; 269 ElementSection<E> section = elements; 270 outer: while (section != null) { 271 for (int i = 0; i < SIZE; i++, realIndex++) { 272 if (realIndex >= maxIndex) { 273 section = null; 274 break outer; 275 } 276 // we can use lazySet here because we are nulling things out and not accessing them again 277 // (relative on Mac Intel i7) lazySet gets us ~30m vs ~26m ops/second in the JMH test (100 adds per release) 278 section.array.set(i, null); 279 } 280 section = section.next.get(); 281 } 282 283 index.set(0); 284 removedIndex.set(0); 285 POOL.returnObject(this); 286 } 287 288 @Override 289 public void unsubscribe() { 290 releaseToPool(); 291 } 292 293 private IndexedRingBuffer() { 294 } 295 296 /** 297 * Add an element and return the index where it was added to allow removal. 298 * 299 * @param e 300 * @return 301 */ 302 public int add(E e) { 303 int i = getIndexForAdd(); 304 if (i < SIZE) { 305 // fast-path when we are in the first section 306 elements.array.set(i, e); 307 return i; 308 } else { 309 int sectionIndex = i % SIZE; 310 getElementSection(i).array.set(sectionIndex, e); 311 return i; 312 } 313 } 314 315 public E remove(int index) { 316 E e; 317 if (index < SIZE) { 318 // fast-path when we are in the first section 319 e = elements.array.getAndSet(index, null); 320 } else { 321 int sectionIndex = index % SIZE; 322 e = getElementSection(index).array.getAndSet(sectionIndex, null); 323 } 324 pushRemovedIndex(index); 325 return e; 326 } 327 328 private IndexSection getIndexSection(int index) { 329 // short-cut the normal case 330 if (index < SIZE) { 331 return removed; 332 } 333 334 // if we have passed the first array we get more complicated and do recursive chaining 335 int numSections = index / SIZE; 336 IndexSection a = removed; 337 for (int i = 0; i < numSections; i++) { 338 a = a.getNext(); 339 } 340 return a; 341 } 342 343 private ElementSection<E> getElementSection(int index) { 344 // short-cut the normal case 345 if (index < SIZE) { 346 return elements; 347 } 348 349 // if we have passed the first array we get more complicated and do recursive chaining 350 int numSections = index / SIZE; 351 ElementSection<E> a = elements; 352 for (int i = 0; i < numSections; i++) { 353 a = a.getNext(); 354 } 355 return a; 356 } 357 358 private synchronized int getIndexForAdd() { 359 /* 360 * Synchronized as I haven't yet figured out a way to do this in an atomic way that doesn't involve object allocation 361 */ 362 int i; 363 int ri = getIndexFromPreviouslyRemoved(); 364 if (ri >= 0) { 365 if (ri < SIZE) { 366 // fast-path when we are in the first section 367 i = removed.getAndSet(ri, -1); 368 } else { 369 int sectionIndex = ri % SIZE; 370 i = getIndexSection(ri).getAndSet(sectionIndex, -1); 371 } 372 if (i == index.get()) { 373 // if it was the last index removed, when we pick it up again we want to increment 374 index.getAndIncrement(); 375 } 376 } else { 377 i = index.getAndIncrement(); 378 } 379 return i; 380 } 381 382 /** 383 * Returns -1 if nothing, 0 or greater if the index should be used 384 * 385 * @return 386 */ 387 private synchronized int getIndexFromPreviouslyRemoved() { 388 /* 389 * Synchronized as I haven't yet figured out a way to do this in an atomic way that doesn't involve object allocation 390 */ 391 392 // loop because of CAS 393 while (true) { 394 int currentRi = removedIndex.get(); 395 if (currentRi > 0) { 396 // claim it 397 if (removedIndex.compareAndSet(currentRi, currentRi - 1)) { 398 return currentRi - 1; 399 } 400 } else { 401 // do nothing 402 return -1; 403 } 404 } 405 } 406 407 private synchronized void pushRemovedIndex(int elementIndex) { 408 /* 409 * Synchronized as I haven't yet figured out a way to do this in an atomic way that doesn't involve object allocation 410 */ 411 412 int i = removedIndex.getAndIncrement(); 413 if (i < SIZE) { 414 // fast-path when we are in the first section 415 removed.set(i, elementIndex); 416 } else { 417 int sectionIndex = i % SIZE; 418 getIndexSection(i).set(sectionIndex, elementIndex); 419 } 420 } 421 422 @Override 423 public boolean isUnsubscribed() { 424 return false; 425 } 426 427 public int forEach(Func1<? super E, Boolean> action) { 428 return forEach(action, 0); 429 } 430 431 /** 432 * 433 * @param action 434 * that processes each item and returns true if it wants to continue to the next 435 * @return int of next index to process, or last index seen if it exited early 436 */ 437 public int forEach(Func1<? super E, Boolean> action, int startIndex) { 438 int endedAt = forEach(action, startIndex, index.get()); 439 if (startIndex > 0 && endedAt == index.get()) { 440 // start at the beginning again and go up to startIndex 441 endedAt = forEach(action, 0, startIndex); 442 } else if (endedAt == index.get()) { 443 // start back at the beginning 444 endedAt = 0; 445 } 446 return endedAt; 447 } 448 449 private int forEach(Func1<? super E, Boolean> action, int startIndex, int endIndex) { 450 int lastIndex = startIndex; 451 int maxIndex = index.get(); 452 int realIndex = startIndex; 453 ElementSection<E> section = elements; 454 455 if (startIndex >= SIZE) { 456 // move into the correct section 457 section = getElementSection(startIndex); 458 startIndex = startIndex % SIZE; 459 } 460 461 outer: while (section != null) { 462 for (int i = startIndex; i < SIZE; i++, realIndex++) { 463 if (realIndex >= maxIndex || realIndex >= endIndex) { 464 section = null; 465 break outer; 466 } 467 E element = section.array.get(i); 468 if (element == null) { 469 continue; 470 } 471 lastIndex = realIndex; 472 boolean continueLoop = action.call(element); 473 if (!continueLoop) { 474 return lastIndex; 475 } 476 } 477 section = section.next.get(); 478 startIndex = 0; // reset to start for next section 479 } 480 481 // return the OutOfBounds index position if we processed all of them ... the one we should be less-than 482 return realIndex; 483 } 484 485 private static class ElementSection<E> { 486 private final AtomicReferenceArray<E> array = new AtomicReferenceArray<E>(SIZE); 487 private final AtomicReference<ElementSection<E>> next = new AtomicReference<ElementSection<E>>(); 488 489 ElementSection<E> getNext() { 490 if (next.get() != null) { 491 return next.get(); 492 } else { 493 ElementSection<E> newSection = new ElementSection<E>(); 494 if (next.compareAndSet(null, newSection)) { 495 // we won 496 return newSection; 497 } else { 498 // we lost so get the value that won 499 return next.get(); 500 } 501 } 502 } 503 } 504 505 private static class IndexSection { 506 507 private final AtomicIntegerArray unsafeArray = new AtomicIntegerArray(SIZE); 508 509 public int getAndSet(int expected, int newValue) { 510 return unsafeArray.getAndSet(expected, newValue); 511 } 512 513 public void set(int i, int elementIndex) { 514 unsafeArray.set(i, elementIndex); 515 } 516 517 private final AtomicReference<IndexSection> _next = new AtomicReference<IndexSection>(); 518 519 IndexSection getNext() { 520 if (_next.get() != null) { 521 return _next.get(); 522 } else { 523 IndexSection newSection = new IndexSection(); 524 if (_next.compareAndSet(null, newSection)) { 525 // we won 526 return newSection; 527 } else { 528 // we lost so get the value that won 529 return _next.get(); 530 } 531 } 532 } 533 534 } 535 536 }